Source File
lazyClient.go
Belonging Package
github.com/multiformats/go-multistream
package multistreamimport ()// NewMSSelect returns a new Multistream which is able to perform// protocol selection with a MultistreamMuxer.func [ StringLike]( io.ReadWriteCloser, ) LazyConn {return &lazyClientConn[]{protos: []{ProtocolID, },con: ,rhandshakeOnce: newOnce(),whandshakeOnce: newOnce(),}}// NewMultistream returns a multistream for the given protocol. This will not// perform any protocol selection. If you are using a MultistreamMuxer, use// NewMSSelect.func [ StringLike]( io.ReadWriteCloser, ) LazyConn {return &lazyClientConn[]{protos: []{},con: ,rhandshakeOnce: newOnce(),whandshakeOnce: newOnce(),}}// once is a sync.Once that can be used by synctest.// For Multistream, it is a bit better than sync.Once because it doesn't// spin when acquiring the lock.type once struct {sem chan struct{}}func newOnce() *once {:= once{sem: make(chan struct{}, 1),}.sem <- struct{}{}return &}func ( *once) ( func()) {// We only ever pull a single value from the channel. But we want to block// Do until the first call to Do has completed. The first call will close// the channel, so by checking if it's closed we know we don't need to do// anything., := <-.semif ! {return}defer close(.sem)()}// lazyClientConn is a ReadWriteCloser adapter that lazily negotiates a protocol// using multistream-select on first use.//// It *does not* block writes waiting for the other end to respond. Instead, it// simply assumes the negotiation went successfully and starts writing data.// See: https://github.com/multiformats/go-multistream/issues/20type lazyClientConn[ StringLike] struct {// Used to ensure we only trigger the write half of the handshake once.rhandshakeOnce *oncererr error// Used to ensure we only trigger the read half of the handshake once.whandshakeOnce *oncewerr error// The sequence of protocols to negotiate.protos []// The inner connection.con io.ReadWriteCloser}// Read reads data from the io.ReadWriteCloser.//// If the protocol hasn't yet been negotiated, this method triggers the write// half of the handshake and then waits for the read half to complete.//// It returns an error if the read half of the handshake fails.func ( *lazyClientConn[]) ( []byte) (int, error) {.rhandshakeOnce.Do(func() {go .whandshakeOnce.Do(.doWriteHandshake).doReadHandshake()})if .rerr != nil {return 0, .rerr}if len() == 0 {return 0, nil}return .con.Read()}func ( *lazyClientConn[]) () {for , := range .protos {// read protocol, := ReadNextToken[](.con)if != nil {.rerr =return}if == "na" {.rerr = ErrNotSupported[]{[]{}}return}if != {.rerr = fmt.Errorf("protocol mismatch in lazy handshake ( %s != %s )", , )return}}}func ( *lazyClientConn[]) () {.doWriteHandshakeWithData(nil)}// Perform the write handshake but *also* write some extra data.func ( *lazyClientConn[]) ( []byte) int {:= getWriter(.con)defer putWriter()for , := range .protos {.werr = delimWrite(, []byte())if .werr != nil {return 0}}:= 0if len() > 0 {, .werr = .Write()if .werr != nil {return}}.werr = .Flush()return}// Write writes the given buffer to the underlying connection.//// If the protocol has not yet been negotiated, write waits for the write half// of the handshake to complete triggers (but does not wait for) the read half.//// Write *also* ignores errors from the read half of the handshake (in case the// stream is actually write only).func ( *lazyClientConn[]) ( []byte) (int, error) {:= 0.whandshakeOnce.Do(func() {go .rhandshakeOnce.Do(.doReadHandshake)= .doWriteHandshakeWithData()})if .werr != nil || > 0 {return , .werr}return .con.Write()}// Close closes the underlying io.ReadWriteCloser after finishing the handshake.func ( *lazyClientConn[]) () error {// As the client, we flush the handshake on close to cover an// interesting edge-case where the server only speaks a single protocol// and responds eagerly with that protocol before waiting for out// handshake.//// Again, we must not read the error because the other end may have// closed the stream for reading. I mean, we're the initiator so that's// strange... but it's still allowed_ = .Flush()// Finish reading the handshake before we close the connection/stream. This// is necessary so that the other side can finish sending its response to our// multistream header before we tell it we are done reading.//// Example:// We open a QUIC stream, write the protocol `/a`, send 1 byte of application// data, and immediately close.//// This can result in a single packet that contains the stream data along// with a STOP_SENDING frame. The other side may be unable to negotiate// multistream select since it can't write to the stream anymore and may// drop the stream.//// Note: We currently handle this case in Go(https://github.com/multiformats/go-multistream/pull/87), but rust-libp2p does not..rhandshakeOnce.Do(.doReadHandshake)return .con.Close()}// Flush sends the handshake.func ( *lazyClientConn[]) () error {.whandshakeOnce.Do(func() {go .rhandshakeOnce.Do(.doReadHandshake).doWriteHandshake()})return .werr}
![]() |
The pages are generated with Golds v0.8.2. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |